package com.fintech.pangu.rocketmq.autoconfigure;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.hook.SendMessageHook;
import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.fintech.pangu.rocketmq.annotation.EnableRocketMQ;
import com.fintech.pangu.rocketmq.annotation.RocketMQMessageListener;
import com.fintech.pangu.rocketmq.core.consumer.DefaultRocketMQListenerContainer;
import com.fintech.pangu.rocketmq.core.consumer.RocketMQListener;
import com.fintech.pangu.rocketmq.core.producer.RocketMQTemplate;
import com.fintech.pangu.rocketmq.core.producer.hook.SendMessageMetricHookImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
 * RocketMQ自动加载类
 *
 * @author dell
 * @since 1.0.0
 */
@Configuration
@ConditionalOnClass(RocketMQTemplate.class)
@ConditionalOnBean(annotation = EnableRocketMQ.class)
@ConditionalOnProperty(prefix = "pangu.rocketmq", name = "enable", matchIfMissing = true)
@EnableConfigurationProperties(RocketMQProperties.class)
public class RocketMQAutoConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);

    /**
     * 用于Producer的发送消息钩子
     */
    @Autowired(required = false)
    private List<SendMessageHook> sendMessageHooks;

    /**
     * 创建生产者Producer
     *
     * @param rocketMQProperties mq属性对象
     * @return 默认的MQProducer对象
     */
    @Bean
    @ConditionalOnClass(DefaultMQProducer.class)
    @ConditionalOnMissingBean(DefaultMQProducer.class)
    public DefaultMQProducer rocketmqProducer(RocketMQProperties rocketMQProperties) {
        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        //生产组
        String groupName = producerConfig != null ? producerConfig.getGroup() : "";

        /**
         * 校验：
         *   nameServer不能为空
         *   producer.group不能为空
         */
        Assert.hasText(rocketMQProperties.getNameServer(), "[pangu.rocketmq.name-server] 不能为空");
        if (!StringUtils.hasText(groupName)) {
            throw new IllegalArgumentException("[pangu.rocketmq.producer.group] 不能为空");
        }

        //创建Producer
        DefaultMQProducer rocketmqProducer = new DefaultMQProducer(groupName);
        //nameServer
        rocketmqProducer.setNamesrvAddr(rocketMQProperties.getNameServer());
        //是否启用vip通道，默认值false
        rocketmqProducer.setVipChannelEnabled(producerConfig.isVipChannelEnabled());
        //发送消息超时时间，单位毫秒，默认值3000
        rocketmqProducer.setSendMsgTimeout(producerConfig.getSendMsgTimeout());
        rocketmqProducer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
        rocketmqProducer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
        //消息体最大值，单位byte，默认4Mb
        rocketmqProducer.setMaxMessageSize(producerConfig.getMaxMessageSize());
        //压缩消息体的阀值，默认1024 * 4，4k，即默认大于4k的消息体将开启压缩
        rocketmqProducer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMsgBodyOverHowmuch());
        //内部发送失败时是否重试另一个broker
        rocketmqProducer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryAnotherBrokerWhenNotStoreOk());

        // 注册自定义SendMessageHook
        if(sendMessageHooks!=null && sendMessageHooks.size()>0){
            DefaultMQProducerImpl defaultMQProducerImpl = rocketmqProducer.getDefaultMQProducerImpl();

            for(SendMessageHook sendMessageHook : sendMessageHooks){
                defaultMQProducerImpl.registerSendMessageHook(sendMessageHook);
            }
        }

        logger.info("默认rocketmq生产端初始化完成： " + rocketmqProducer);

        return rocketmqProducer;
    }


    /**
     * 创建RocketMQTemplate
     *
     * @param producer
     * @return
     */
    @Bean(destroyMethod = "destroy")
    @ConditionalOnBean(DefaultMQProducer.class)
    @ConditionalOnMissingBean(RocketMQTemplate.class)
    public RocketMQTemplate rocketMQTemplate(DefaultMQProducer producer) {
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        rocketMQTemplate.setProducer(producer);

        return rocketMQTemplate;
    }


    /**
     * 消费者的RocketMQListener相关配置
     */
    @Configuration
    @ConditionalOnClass(DefaultMQPushConsumer.class)
    @EnableConfigurationProperties(RocketMQProperties.class)
    @ConditionalOnProperty(prefix = "pangu.rocketmq.consumer", name = "enable", matchIfMissing = true)
    public static class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

        private ConfigurableApplicationContext applicationContext;

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext) applicationContext;
        }

        @Autowired
        private RocketMQProperties rocketMQProperties;

        @Autowired
        private StandardEnvironment environment;

        //生成container beanName时的计数器
        private AtomicLong counter = new AtomicLong(0);


        /**
         * 实现该接口后，当所有单例 bean 都初始化完成以后， 容器会回调该接口的方法 afterSingletonsInstantiated
         * 主要应用场合就是在所有单例 bean 创建完成之后，可以在该回调中做一些事情
         */
        @Override
        public void afterSingletonsInstantiated() {
            init();
        }

        private void init() {
            Assert.hasText(rocketMQProperties.getNameServer(), "[pangu.rocketmq.name-server] must not be null");

            //获取所有使用了@RocketMQMessageListener注解的spring容器中的bean
            Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

            logger.info("开始扫描加载所有rocketmq消费端容器实例, 总个数: {} 个", beans.size());
            //迭代并调用registerContainer()，向spring中注册RocketMQListenerContainer
            if (Objects.nonNull(beans)) {
                beans.forEach(this::registerContainer);
            }

            logger.info("完成扫描加载所有rocketmq消费端容器实例, 总个数: {} 个, 成功加载: {} 个", beans.size(), counter);
        }

        /**
         * 使用@RocketMQMessageListener的bean，向spring容器中注册RocketMQListenerContainer
         *
         * @param beanName 实例名称
         * @param bean     实例对象
         */
        private void registerContainer(String beanName, Object bean) {
            //获取bean的Class
            Class<?> clazz = AopUtils.getTargetClass(bean);

            //判断clazz是不是RocketMQListener接口类型的
            if (!RocketMQListener.class.isAssignableFrom(clazz)) {
                throw new IllegalStateException(clazz + " 不是实例 " + RocketMQListener.class.getName());
            }

            //获取RocketMQListener接口实现类上的注解RocketMQMessageListener
            RocketMQListener rocketMQListener = (RocketMQListener) bean;
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
            String consumerGroup = environment.resolvePlaceholders(annotation.consumerGroup());

            if (!annotation.enable()) {
                logger.warn("rocketmq消费端容器实例[消费组名={}]默认未开启", consumerGroup);
                return;
            }

            /**
             * BeanDefinition的builder，用于创建DefaultRocketMQListenerContainer的BeanDefinition
             * environment.resolvePlaceholders()用于处理${}占位符的问题，即@RocketMQMessageListener注解上的属性部分可以使用${}占位符，从配置文件获取
             */
            BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMQListenerContainer.class);

            // 允许传入自定义的rocketmq 的nameserver地址，适配连接多个不同的mq的情况
            String nameServer = environment.resolvePlaceholders(annotation.nameServer());
            nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;

            beanBuilder.addPropertyValue("nameServer", nameServer);
            beanBuilder.addPropertyValue("topic", environment.resolvePlaceholders(annotation.topic()));
            beanBuilder.addPropertyValue("consumerGroup", consumerGroup);
            //CONCURRENTLY 或 ORDERLY
            beanBuilder.addPropertyValue("consumeMode", annotation.consumeMode());
            //最小消费线程数，默认20
            beanBuilder.addPropertyValue("consumeThreadMin", Integer.valueOf(environment.resolvePlaceholders(annotation.consumeThreadMin())));
            //最大消费线程数，默认64
            beanBuilder.addPropertyValue("consumeThreadMax", Integer.valueOf(environment.resolvePlaceholders(annotation.consumeThreadMax())));
            //最大批量消费大小，默认1
            beanBuilder.addPropertyValue("consumeMessageBatchMaxSize", Integer.valueOf(environment.resolvePlaceholders(annotation.consumeMessageBatchMaxSize())));
            //最大重复消费次数，默认3
            beanBuilder.addPropertyValue("maxReconsumeTime", Integer.valueOf(environment.resolvePlaceholders(annotation.maxReconsumeTime())));
            //CLUSTERING 或 BROADCASTING
            beanBuilder.addPropertyValue("messageModel", annotation.messageModel());
            //过滤类型，只有TAG
            beanBuilder.addPropertyValue("selectorType", annotation.selectorType());
            //过滤表达式
            beanBuilder.addPropertyValue("selectorExpress", environment.resolvePlaceholders(annotation.selectorExpress()));
            //rocketMQListener实现类的实例
            beanBuilder.addPropertyValue("rocketMQListener", rocketMQListener);
            beanBuilder.setDestroyMethodName("destroy");

            /**
             * 创建bean的定义BeanDefinition，并给bean起名，之后注册到beanFactory
             * 过程类似于在xml中配置
             */
            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
            DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();

            beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());

            //获取名为containerBeanName的DefaultRocketMQListenerContainer实例
            DefaultRocketMQListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);

            //启动container
            if (!container.isStarted()) {
                try {
                    container.start();
                } catch (Exception e) {
                    logger.error("rocketmq消费端容器[{}] 启动失败. {}", consumerGroup, container, e);
                    throw new RuntimeException(e);
                }
            }

            logger.info("注册rocketmq监听器到消费端容器成功, 消费组名:{}, 消息监听器实例名:{}, 消费端容器实例名:{}", consumerGroup, beanName, containerBeanName);
        }
    }


    /**
     * 消息发送Hook，默认使用NoOpRocketMQMetricsCollector-空收集器
     * @return
     */
    @Bean
    public SendMessageHook sendMessageMetricHook(){
        return new SendMessageMetricHookImpl();
    }



}
